package com.apobates.jforum.grief.schema2doc.reactive;

import com.apobates.jforum.grief.schema2doc.SchemaExportException;
import com.apobates.jforum.grief.schema2doc.output.freeMarker.ExportTemplate;
import com.apobates.jforum.grief.schema2doc.core.SchemaTableStrategy;
import com.apobates.jforum.grief.schema2doc.core.entity.Column;
import com.apobates.jforum.grief.schema2doc.core.entity.Information;
import com.apobates.jforum.grief.schema2doc.core.entity.Table;
import com.apobates.jforum.grief.schema2doc.reactive.out.ConsoleSchemaObservableOutput;
import io.reactivex.*;
import io.reactivex.annotations.NonNull;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Function;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

/**
 * 反应式导出执行器
 */
public class ExportReactiveExecutor {
    // 输出
    private final SchemaObservableOutput output;
    // 待输出结果集
    private final SchemaQueryFlowableResult result;
    // 结果集的查询策略
    private final SchemaTableStrategy strategy;
    // 导出样式模板
    private final ExportTemplate template;
    // 操作的数据库名称
    // Oracle是没有这个概念的
    public final Optional<String> dbName;
    // 模式名称/Oracle和PostgreSQL
    // MySQL和SQLServer是没有这个概念的
    public final Optional<String> schema;
    // RxJava Subscribe.onComplete调用时的回调
    public final RxJavaCompleteHandler completeHandler;
    private final static Logger logger = LoggerFactory.getLogger(ExportReactiveExecutor.class);

    private ExportReactiveExecutor(
            SchemaObservableOutput output,
            SchemaQueryFlowableResult result,
            SchemaTableStrategy strategy,
            ExportTemplate template,
            RxJavaCompleteHandler completeHandler,
            Optional<String> dbName,
            Optional<String> schema) {
        this.output = output;
        this.result = result;
        this.strategy = strategy;
        this.template = template;
        this.completeHandler = completeHandler;
        this.dbName = dbName;
        this.schema = schema;
    }

    /**
     * 基于SchemaFlowableOutput执行导出
     * @param info 导出信息
     * @param out 导出目标
     *
     * @throws SchemaExportException
     */
    public void export(Information info, SchemaFlowableOutput out) throws SchemaExportException{
        long startStamp = System.currentTimeMillis();
        long finishStamp = 0;
        // Flowable
        Flowable<Table> tableFlowable = result.getAll(dbName, schema, strategy).flatMap(new Function<Table, Publisher<Table>>() {
            @Override
            public Publisher<Table> apply(@NonNull Table table) throws Exception {
                return result.getTableColumn(table).flatMap(new Function<List<Column>, SingleSource<Table>>() {
                    @Override
                    public SingleSource<Table> apply(@NonNull List<Column> columns) throws Exception {
                        return Single.just(table.fillColumn(columns));
                    }
                }).flatMapPublisher(new Function<Table, Publisher<? extends Table>>() {
                    @Override
                    public Publisher<? extends Table> apply(@NonNull Table table) throws Exception {
                        return Flowable.just(table);
                    }
                });
            }
        });
        Disposable disposable = null;
        try {
            disposable = out.flush(info, tableFlowable);
            CompletableFuture<String> future = out.getFuture();
            while(!future.isDone()){
                logger.info("[ERE-Flowable]未完成,线程休眠1秒");
                Thread.currentThread().sleep(1000,0);
            }
            String result = future.get();
            logger.info("[ERE-Flowable]完成, 结果:"+result);
            if(result.equals("OK")){
                finishStamp = System.currentTimeMillis();
                clearHander(disposable, "[ERE-Flowable]RxJava disposed because complete,WithTime: "+(finishStamp-startStamp));
            }
        }catch (Exception e){
            clearHander(disposable, "[ERE-Flowable]RxJava disposed has Exception: "+e.getMessage());
        }
    }
    /**
     * 基于SchemaObservableOutput执行导出
     * @param info 导出信息
     *
     * @throws SchemaExportException
     */
    public void export(Information info) throws SchemaExportException {
        long startStamp = System.currentTimeMillis();
        // RxJava2
        logger.info("Start Export");
        Observable<Table> observable = result.getAll(dbName, schema, strategy).toObservable().flatMap(new Function<Table, ObservableSource<Table>>() {
            @Override
            public ObservableSource<Table> apply(@NonNull Table table) throws Exception {
                return result.getTableColumn(table).toObservable().flatMap(new Function<List<Column>, ObservableSource<Table>>() {
                    @Override
                    public ObservableSource<Table> apply(@NonNull List<Column> columns) throws Exception {
                        return Observable.just(table.fillColumn(columns));
                    }
                });
            }
        });

        Disposable disposable = null;
        try {
            disposable = output.flush(info, observable);
            CompletableFuture<Integer> future= output.getFuture();
            while(!future.isDone()){
                logger.info("[ERE-Observable]未完成,线程休眠1秒");
                Thread.currentThread().sleep(1000,0);
            }
            Integer result = future.get();
            if(result.compareTo(0) == 1){
                long finishStamp = System.currentTimeMillis();
                clearHander(disposable, String.format("[ERE-Observable][%s]RxJava disposed because complete,  Affect Size:%d, WithTime: %d", output.getActor(), result.intValue(), (finishStamp-startStamp)));
            }
        }catch (Exception e){
            clearHander(disposable, String.format("[ERE-Observable][%s]RxJava disposed has Exception: %s", output.getActor(), e.getMessage()));
        }
    }
    private void clearHander(Disposable disposable, String reason){
        System.out.println(reason);
        logger.info(reason);
        if(null!=disposable && !disposable.isDisposed()) {
            disposable.dispose();
        }else{
            if(null != disposable) {
                logger.info("[CH]disposable status:" + disposable.isDisposed());
            }else{
                logger.info("[CH]disposable is null");
            }
        }
        completeHandler.apply();
    }
    /**
     * 创建导出设置向导
     * @param result
     * @return
     */
    public static Builder supply(SchemaQueryFlowableResult result){
        return new Builder(result);
    }

    /**
     * 默认退出回调
     */
    private static class DefaultExitCompleteHander implements RxJavaCompleteHandler{
        @Override
        public void apply() {
            System.exit(0);
        }
    }

    public static class Builder {
        // 输出
        private SchemaObservableOutput output;
        // 待输出结果集
        private SchemaQueryFlowableResult result;
        // 结果集的查询策略
        private SchemaTableStrategy strategy;
        // 导出样式模板
        private ExportTemplate template;
        // RxJava运行结束的回调
        private RxJavaCompleteHandler completeHandler;

        private Builder(SchemaQueryFlowableResult result) {
            this.result = result;
            this.strategy = SchemaTableStrategy.empty();
            this.output = new ConsoleSchemaObservableOutput();
            this.template = ExportTemplate.basic();
            this.completeHandler = new DefaultExitCompleteHander();
        }

        /**
         * 设置导出目标
         *
         * @param output
         * @return
         */
        public Builder target(SchemaObservableOutput output) {
            this.output = output;
            return this;
        }

        /**
         * 设置导出表的筛选策略
         *
         * @param strategy
         * @return
         */
        public Builder strategy(SchemaTableStrategy strategy) {
            this.strategy = strategy;
            return this;
        }

        /**
         * 设置导出的模板.
         *
         * @param template
         * @return
         */
        public Builder template(ExportTemplate template) {
            this.template = template;
            return this;
        }

        /**
         * 设置RxJava Subscribe.onComplete调用时的回调
         * @param handler
         * @return
         */
        public Builder complete(RxJavaCompleteHandler handler){
            this.completeHandler = handler;
            return this;
        }

        /**
         * 使用指定数据库创建导出执行器
         * MySQL/SQLServer: 数据库名称
         *
         * @param dbName
         * @return
         */
        public ExportReactiveExecutor database(String dbName) {
            // (SchemaAsyncOutput output, SchemaQueryAsyncResult result, SchemaTableStrategy strategy, ExportTemplate template, String target)
            return new ExportReactiveExecutor(output, result, strategy, template, completeHandler, Optional.ofNullable(dbName), Optional.empty());
        }

        /**
         * 使用指定数据库和模式创建导出执行器
         * PostgreSQL
         *
         * @param dbName 数据库名称
         * @param schema 模式名称
         * @return
         */
        public ExportReactiveExecutor database(String dbName, String schema) {
            return new ExportReactiveExecutor(output, result, strategy, template, completeHandler, Optional.ofNullable(dbName), Optional.ofNullable(schema));
        }

        /**
         * 使用指定模式创建导出执行器
         * Oracle
         *
         * @param schema
         * @return
         */
        public ExportReactiveExecutor schema(String schema) {
            return new ExportReactiveExecutor(output, result, strategy, template, completeHandler, Optional.empty(), Optional.ofNullable(schema));
        }
    }
}
